package com.atguigu.flink.chapter11.function;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/12/22 10:19
 */
public class Flink01_Scala {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);
        
        DataStreamSource<WaterSensor> waterSensorStream =
            env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                             new WaterSensor("sensor_1", 2000L, 20),
                             new WaterSensor("sensor_2", 3000L, 30),
                             new WaterSensor("sensor_1", 4000L, 40),
                             new WaterSensor("sensor_1", 5000L, 50),
                             new WaterSensor("sensor_2", 6000L, 60));
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        Table table = tEnv.fromDataStream(waterSensorStream);
        tEnv.createTemporaryView("sensor", table);
        
        // 1.在table API中使用
        // 1.1 内联的方式使用
        /*table
            .select($("id"), call(MyToUpperCase.class, $("id")))
            .execute()
            .print();*/
        
        // 1.2 函数先注册, 再使用
       /* tEnv.createTemporaryFunction("my_to_uppercase", MyToUpperCase.class);
        table
            .select($("id"), call("my_to_uppercase", $("id")))
            .execute()
            .print();
        */
        // 2. 在sql中使用
        tEnv.createTemporaryFunction("my_to_uppercase", MyToUpperCase.class);
        tEnv.sqlQuery("select id, my_to_uppercase(id) from sensor").execute().print();
        
    }
    
    public static class MyToUpperCase extends ScalarFunction {
        // 方法名和修饰符必须这样写, 其他根据具体需求
        public String eval(String s) {
            if (s != null) {
                return s.toUpperCase();
            }
            return null;
        }
    }
}
